1
//--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // File: QueuedTaskScheduler.cs
7 //--------------------------------------------------------------------------
9 using System
.Collections
.Concurrent
;
10 using System
.Collections
.Generic
;
11 using System
.Diagnostics
;
14 namespace System
.Threading
.Tasks
.Schedulers
17 /// Provides a TaskScheduler that provides control over priorities, fairness, and the underlying threads utilized.
19 [DebuggerTypeProxy(typeof(QueuedTaskSchedulerDebugView
))]
20 [DebuggerDisplay("Id={Id}, Queues={DebugQueueCount}, ScheduledTasks = {DebugTaskCount}")]
21 public sealed class QueuedTaskScheduler
: TaskScheduler
, IDisposable
23 /// <summary>Debug view for the QueuedTaskScheduler.</summary>
24 private class QueuedTaskSchedulerDebugView
26 /// <summary>The scheduler.</summary>
27 private QueuedTaskScheduler _scheduler
;
29 /// <summary>Initializes the debug view.</summary>
30 /// <param name="scheduler">The scheduler.</param>
31 public QueuedTaskSchedulerDebugView(QueuedTaskScheduler scheduler
)
33 if (scheduler
== null) throw new ArgumentNullException("scheduler");
34 _scheduler
= scheduler
;
37 /// <summary>Gets all of the Tasks queued to the scheduler directly.</summary>
38 public IEnumerable
<Task
> ScheduledTasks
42 var tasks
= (_scheduler
._targetScheduler
!= null) ?
43 (IEnumerable
<Task
>)_scheduler
._nonthreadsafeTaskQueue
:
44 (IEnumerable
<Task
>)_scheduler
._blockingTaskQueue
;
45 return tasks
.Where(t
=> t
!= null).ToList();
49 /// <summary>Gets the prioritized and fair queues.</summary>
50 public IEnumerable
<TaskScheduler
> Queues
54 List
<TaskScheduler
> queues
= new List
<TaskScheduler
>();
55 foreach (var group in _scheduler
._queueGroups
) queues
.AddRange(group.Value
);
62 /// A sorted list of round-robin queue lists. Tasks with the smallest priority value
63 /// are preferred. Priority groups are round-robin'd through in order of priority.
65 private readonly SortedList
<int, QueueGroup
> _queueGroups
= new SortedList
<int, QueueGroup
>();
66 /// <summary>Cancellation token used for disposal.</summary>
67 private readonly CancellationTokenSource _disposeCancellation
= new CancellationTokenSource();
69 /// The maximum allowed concurrency level of this scheduler. If custom threads are
70 /// used, this represents the number of created threads.
72 private readonly int _concurrencyLevel
;
73 /// <summary>Whether we're processing tasks on the current thread.</summary>
74 private static ThreadLocal
<bool> _taskProcessingThread
= new ThreadLocal
<bool>();
77 // *** For when using a target scheduler
80 /// <summary>The scheduler onto which actual work is scheduled.</summary>
81 private readonly TaskScheduler _targetScheduler
;
82 /// <summary>The queue of tasks to process when using an underlying target scheduler.</summary>
83 private readonly Queue
<Task
> _nonthreadsafeTaskQueue
;
84 /// <summary>The number of Tasks that have been queued or that are running whiel using an underlying scheduler.</summary>
85 private int _delegatesQueuedOrRunning
= 0;
88 // *** For when using our own threads
91 /// <summary>The threads used by the scheduler to process work.</summary>
92 private readonly Thread
[] _threads
;
93 /// <summary>The collection of tasks to be executed on our custom threads.</summary>
94 private readonly BlockingCollection
<Task
> _blockingTaskQueue
;
98 /// <summary>Initializes the scheduler.</summary>
99 public QueuedTaskScheduler() : this(TaskScheduler
.Default
, 0) { }
101 /// <summary>Initializes the scheduler.</summary>
102 /// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param>
103 public QueuedTaskScheduler(TaskScheduler targetScheduler
) : this(targetScheduler
, 0) { }
105 /// <summary>Initializes the scheduler.</summary>
106 /// <param name="targetScheduler">The target underlying scheduler onto which this sceduler's work is queued.</param>
107 /// <param name="maxConcurrencyLevel">The maximum degree of concurrency allowed for this scheduler's work.</param>
108 public QueuedTaskScheduler(
109 TaskScheduler targetScheduler
,
110 int maxConcurrencyLevel
)
112 // Validate arguments
113 if (targetScheduler
== null) throw new ArgumentNullException("underlyingScheduler");
114 if (maxConcurrencyLevel
< 0) throw new ArgumentOutOfRangeException("concurrencyLevel");
116 // Initialize only those fields relevant to use an underlying scheduler. We don't
117 // initialize the fields relevant to using our own custom threads.
118 _targetScheduler
= targetScheduler
;
119 _nonthreadsafeTaskQueue
= new Queue
<Task
>();
121 // If 0, use the number of logical processors. But make sure whatever value we pick
122 // is not greater than the degree of parallelism allowed by the underlying scheduler.
123 _concurrencyLevel
= maxConcurrencyLevel
!= 0 ? maxConcurrencyLevel
: Environment
.ProcessorCount
;
124 if (targetScheduler
.MaximumConcurrencyLevel
> 0 &&
125 targetScheduler
.MaximumConcurrencyLevel
< _concurrencyLevel
)
127 _concurrencyLevel
= targetScheduler
.MaximumConcurrencyLevel
;
131 /// <summary>Initializes the scheduler.</summary>
132 /// <param name="threadCount">The number of threads to create and use for processing work items.</param>
133 public QueuedTaskScheduler(int threadCount
) : this(threadCount
, string.Empty
, false, ThreadPriority
.Normal
, ApartmentState
.MTA
, 0, null, null) { }
135 /// <summary>Initializes the scheduler.</summary>
136 /// <param name="threadCount">The number of threads to create and use for processing work items.</param>
137 /// <param name="threadName">The name to use for each of the created threads.</param>
138 /// <param name="useForegroundThreads">A Boolean value that indicates whether to use foreground threads instead of background.</param>
139 /// <param name="threadPriority">The priority to assign to each thread.</param>
140 /// <param name="threadApartmentState">The apartment state to use for each thread.</param>
141 /// <param name="threadMaxStackSize">The stack size to use for each thread.</param>
142 /// <param name="threadInit">An initialization routine to run on each thread.</param>
143 /// <param name="threadFinally">A finalization routine to run on each thread.</param>
144 public QueuedTaskScheduler(
146 string threadName
= "",
147 bool useForegroundThreads
= false,
148 ThreadPriority threadPriority
= ThreadPriority
.Normal
,
149 ApartmentState threadApartmentState
= ApartmentState
.MTA
,
150 int threadMaxStackSize
= 0,
151 Action threadInit
= null,
152 Action threadFinally
= null)
154 // Validates arguments (some validation is left up to the Thread type itself).
155 // If the thread count is 0, default to the number of logical processors.
156 if (threadCount
< 0) throw new ArgumentOutOfRangeException("concurrencyLevel");
157 else if (threadCount
== 0) _concurrencyLevel
= Environment
.ProcessorCount
;
158 else _concurrencyLevel
= threadCount
;
160 // Initialize the queue used for storing tasks
161 _blockingTaskQueue
= new BlockingCollection
<Task
>();
163 // Create all of the threads
164 _threads
= new Thread
[threadCount
];
165 for (int i
= 0; i
< threadCount
; i
++)
167 _threads
[i
] = new Thread(() => ThreadBasedDispatchLoop(threadInit
, threadFinally
), threadMaxStackSize
)
169 Priority
= threadPriority
,
170 IsBackground
= !useForegroundThreads
,
172 if (threadName
!= null) _threads
[i
].Name
= threadName
+ " (" + i
+ ")";
173 _threads
[i
].SetApartmentState(threadApartmentState
);
176 // Start all of the threads
177 foreach (var thread
in _threads
) thread
.Start();
180 /// <summary>The dispatch loop run by all threads in this scheduler.</summary>
181 /// <param name="threadInit">An initialization routine to run when the thread begins.</param>
182 /// <param name="threadFinally">A finalization routine to run before the thread ends.</param>
183 private void ThreadBasedDispatchLoop(Action threadInit
, Action threadFinally
)
185 _taskProcessingThread
.Value
= true;
186 if (threadInit
!= null) threadInit();
189 // If the scheduler is disposed, the cancellation token will be set and
190 // we'll receive an OperationCanceledException. That OCE should not crash the process.
193 // If a thread abort occurs, we'll try to reset it and continue running.
198 // For each task queued to the scheduler, try to execute it.
199 foreach (var task
in _blockingTaskQueue
.GetConsumingEnumerable(_disposeCancellation
.Token
))
201 // If the task is not null, that means it was queued to this scheduler directly.
205 TryExecuteTask(task
);
207 // If the task is null, that means it's just a placeholder for a task
208 // queued to one of the subschedulers. Find the next task based on
209 // priority and fairness and run it.
212 // Find the next task based on our ordering rules...
214 QueuedTaskSchedulerQueue queueForTargetTask
;
215 lock (_queueGroups
) FindNextTask_NeedsLock(out targetTask
, out queueForTargetTask
);
217 // ... and if we found one, run it
218 if (targetTask
!= null) queueForTargetTask
.ExecuteTask(targetTask
);
222 catch (ThreadAbortException
)
224 // If we received a thread abort, and that thread abort was due to shutting down
225 // or unloading, let it pass through. Otherwise, reset the abort so we can
226 // continue processing work items.
227 if (!Environment
.HasShutdownStarted
&& !AppDomain
.CurrentDomain
.IsFinalizingForUnload())
234 catch (OperationCanceledException
) { }
238 // Run a cleanup routine if there was one
239 if (threadFinally
!= null) threadFinally();
240 _taskProcessingThread
.Value
= false;
244 /// <summary>Gets the number of queues currently activated.</summary>
245 private int DebugQueueCount
250 foreach (var group in _queueGroups
) count
+= group.Value
.Count
;
255 /// <summary>Gets the number of tasks currently scheduled.</summary>
256 private int DebugTaskCount
260 return (_targetScheduler
!= null ?
261 (IEnumerable
<Task
>)_nonthreadsafeTaskQueue
: (IEnumerable
<Task
>)_blockingTaskQueue
)
262 .Where(t
=> t
!= null).Count();
266 /// <summary>Find the next task that should be executed, based on priorities and fairness and the like.</summary>
267 /// <param name="targetTask">The found task, or null if none was found.</param>
268 /// <param name="queueForTargetTask">
269 /// The scheduler associated with the found task. Due to security checks inside of TPL,
270 /// this scheduler needs to be used to execute that task.
272 private void FindNextTask_NeedsLock(out Task targetTask
, out QueuedTaskSchedulerQueue queueForTargetTask
)
275 queueForTargetTask
= null;
277 // Look through each of our queue groups in sorted order.
278 // This ordering is based on the priority of the queues.
279 foreach (var queueGroup
in _queueGroups
)
281 var queues
= queueGroup
.Value
;
283 // Within each group, iterate through the queues in a round-robin
284 // fashion. Every time we iterate again and successfully find a task,
285 // we'll start in the next location in the group.
286 foreach (int i
in queues
.CreateSearchOrder())
288 queueForTargetTask
= queues
[i
];
289 var items
= queueForTargetTask
._workItems
;
292 targetTask
= items
.Dequeue();
293 if (queueForTargetTask
._disposed
&& items
.Count
== 0)
295 RemoveQueue_NeedsLock(queueForTargetTask
);
297 queues
.NextQueueIndex
= (queues
.NextQueueIndex
+ 1) % queueGroup
.Value
.Count
;
304 /// <summary>Queues a task to the scheduler.</summary>
305 /// <param name="task">The task to be queued.</param>
306 protected override void QueueTask(Task task
)
308 // If we've been disposed, no one should be queueing
309 if (_disposeCancellation
.IsCancellationRequested
) throw new ObjectDisposedException(GetType().Name
);
311 // If the target scheduler is null (meaning we're using our own threads),
312 // add the task to the blocking queue
313 if (_targetScheduler
== null)
315 _blockingTaskQueue
.Add(task
);
317 // Otherwise, add the task to the non-blocking queue,
318 // and if there isn't already an executing processing task,
322 // Queue the task and check whether we should launch a processing
323 // task (noting it if we do, so that other threads don't result
324 // in queueing up too many).
325 bool launchTask
= false;
326 lock (_nonthreadsafeTaskQueue
)
328 _nonthreadsafeTaskQueue
.Enqueue(task
);
329 if (_delegatesQueuedOrRunning
< _concurrencyLevel
)
331 ++_delegatesQueuedOrRunning
;
336 // If necessary, start processing asynchronously
339 Task
.Factory
.StartNew(ProcessPrioritizedAndBatchedTasks
,
340 CancellationToken
.None
, TaskCreationOptions
.None
, _targetScheduler
);
346 /// Process tasks one at a time in the best order.
347 /// This should be run in a Task generated by QueueTask.
348 /// It's been separated out into its own method to show up better in Parallel Tasks.
350 private void ProcessPrioritizedAndBatchedTasks()
352 bool continueProcessing
= true;
353 while (!_disposeCancellation
.IsCancellationRequested
&& continueProcessing
)
357 // Note that we're processing tasks on this thread
358 _taskProcessingThread
.Value
= true;
360 // Until there are no more tasks to process
361 while (!_disposeCancellation
.IsCancellationRequested
)
363 // Try to get the next task. If there aren't any more, we're done.
365 lock (_nonthreadsafeTaskQueue
)
367 if (_nonthreadsafeTaskQueue
.Count
== 0) break;
368 targetTask
= _nonthreadsafeTaskQueue
.Dequeue();
371 // If the task is null, it's a placeholder for a task in the round-robin queues.
372 // Find the next one that should be processed.
373 QueuedTaskSchedulerQueue queueForTargetTask
= null;
374 if (targetTask
== null)
376 lock (_queueGroups
) FindNextTask_NeedsLock(out targetTask
, out queueForTargetTask
);
379 // Now if we finally have a task, run it. If the task
380 // was associated with one of the round-robin schedulers, we need to use it
381 // as a thunk to execute its task.
382 if (targetTask
!= null)
384 if (queueForTargetTask
!= null) queueForTargetTask
.ExecuteTask(targetTask
);
385 else TryExecuteTask(targetTask
);
391 // Now that we think we're done, verify that there really is
392 // no more work to do. If there's not, highlight
393 // that we're now less parallel than we were a moment ago.
394 lock (_nonthreadsafeTaskQueue
)
396 if (_nonthreadsafeTaskQueue
.Count
== 0)
398 _delegatesQueuedOrRunning
--;
399 continueProcessing
= false;
400 _taskProcessingThread
.Value
= false;
407 /// <summary>Notifies the pool that there's a new item to be executed in one of the round-robin queues.</summary>
408 private void NotifyNewWorkItem() { QueueTask(null); }
410 /// <summary>Tries to execute a task synchronously on the current thread.</summary>
411 /// <param name="task">The task to execute.</param>
412 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
413 /// <returns>true if the task was executed; otherwise, false.</returns>
414 protected override bool TryExecuteTaskInline(Task task
, bool taskWasPreviouslyQueued
)
416 // If we're already running tasks on this threads, enable inlining
417 return _taskProcessingThread
.Value
&& TryExecuteTask(task
);
420 /// <summary>Gets the tasks scheduled to this scheduler.</summary>
421 /// <returns>An enumerable of all tasks queued to this scheduler.</returns>
422 /// <remarks>This does not include the tasks on sub-schedulers. Those will be retrieved by the debugger separately.</remarks>
423 protected override IEnumerable
<Task
> GetScheduledTasks()
425 // If we're running on our own threads, get the tasks from the blocking queue...
426 if (_targetScheduler
== null)
428 // Get all of the tasks, filtering out nulls, which are just placeholders
429 // for tasks in other sub-schedulers
430 return _blockingTaskQueue
.Where(t
=> t
!= null).ToList();
432 // otherwise get them from the non-blocking queue...
435 return _nonthreadsafeTaskQueue
.Where(t
=> t
!= null).ToList();
439 /// <summary>Gets the maximum concurrency level to use when processing tasks.</summary>
440 public override int MaximumConcurrencyLevel { get { return _concurrencyLevel; }
}
442 /// <summary>Initiates shutdown of the scheduler.</summary>
443 public void Dispose()
445 _disposeCancellation
.Cancel();
448 /// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
449 /// <returns>The newly created and activated queue at priority 0.</returns>
450 public TaskScheduler
ActivateNewQueue() { return ActivateNewQueue(0); }
452 /// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
453 /// <param name="priority">The priority level for the new queue.</param>
454 /// <returns>The newly created and activated queue at the specified priority.</returns>
455 public TaskScheduler
ActivateNewQueue(int priority
)
458 var createdQueue
= new QueuedTaskSchedulerQueue(priority
, this);
460 // Add the queue to the appropriate queue group based on priority
464 if (!_queueGroups
.TryGetValue(priority
, out list
))
466 list
= new QueueGroup();
467 _queueGroups
.Add(priority
, list
);
469 list
.Add(createdQueue
);
472 // Hand the new queue back
476 /// <summary>Removes a scheduler from the group.</summary>
477 /// <param name="queue">The scheduler to be removed.</param>
478 private void RemoveQueue_NeedsLock(QueuedTaskSchedulerQueue queue
)
480 // Find the group that contains the queue and the queue's index within the group
481 var queueGroup
= _queueGroups
[queue
._priority
];
482 int index
= queueGroup
.IndexOf(queue
);
484 // We're about to remove the queue, so adjust the index of the next
485 // round-robin starting location if it'll be affected by the removal
486 if (queueGroup
.NextQueueIndex
>= index
) queueGroup
.NextQueueIndex
--;
489 queueGroup
.RemoveAt(index
);
492 /// <summary>A group of queues a the same priority level.</summary>
493 private class QueueGroup
: List
<QueuedTaskSchedulerQueue
>
495 /// <summary>The starting index for the next round-robin traversal.</summary>
496 public int NextQueueIndex
= 0;
498 /// <summary>Creates a search order through this group.</summary>
499 /// <returns>An enumerable of indices for this group.</returns>
500 public IEnumerable
<int> CreateSearchOrder()
502 for (int i
= NextQueueIndex
; i
< Count
; i
++) yield return i
;
503 for (int i
= 0; i
< NextQueueIndex
; i
++) yield return i
;
507 /// <summary>Provides a scheduling queue associatd with a QueuedTaskScheduler.</summary>
508 [DebuggerDisplay("QueuePriority = {_priority}, WaitingTasks = {WaitingTasks}")]
509 [DebuggerTypeProxy(typeof(QueuedTaskSchedulerQueueDebugView
))]
510 private sealed class QueuedTaskSchedulerQueue
: TaskScheduler
, IDisposable
512 /// <summary>A debug view for the queue.</summary>
513 private sealed class QueuedTaskSchedulerQueueDebugView
515 /// <summary>The queue.</summary>
516 private readonly QueuedTaskSchedulerQueue _queue
;
518 /// <summary>Initializes the debug view.</summary>
519 /// <param name="queue">The queue to be debugged.</param>
520 public QueuedTaskSchedulerQueueDebugView(QueuedTaskSchedulerQueue queue
)
522 if (queue
== null) throw new ArgumentNullException("queue");
526 /// <summary>Gets the priority of this queue in its associated scheduler.</summary>
527 public int Priority { get { return _queue._priority; }
}
528 /// <summary>Gets the ID of this scheduler.</summary>
529 public int Id { get { return _queue.Id; }
}
530 /// <summary>Gets all of the tasks scheduled to this queue.</summary>
531 public IEnumerable
<Task
> ScheduledTasks { get { return _queue.GetScheduledTasks(); }
}
532 /// <summary>Gets the QueuedTaskScheduler with which this queue is associated.</summary>
533 public QueuedTaskScheduler AssociatedScheduler { get { return _queue._pool; }
}
536 /// <summary>The scheduler with which this pool is associated.</summary>
537 private readonly QueuedTaskScheduler _pool
;
538 /// <summary>The work items stored in this queue.</summary>
539 internal readonly Queue
<Task
> _workItems
;
540 /// <summary>Whether this queue has been disposed.</summary>
541 internal bool _disposed
;
542 /// <summary>Gets the priority for this queue.</summary>
543 internal int _priority
;
545 /// <summary>Initializes the queue.</summary>
546 /// <param name="priority">The priority associated with this queue.</param>
547 /// <param name="pool">The scheduler with which this queue is associated.</param>
548 internal QueuedTaskSchedulerQueue(int priority
, QueuedTaskScheduler pool
)
550 _priority
= priority
;
552 _workItems
= new Queue
<Task
>();
555 /// <summary>Gets the number of tasks waiting in this scheduler.</summary>
556 internal int WaitingTasks { get { return _workItems.Count; }
}
558 /// <summary>Gets the tasks scheduled to this scheduler.</summary>
559 /// <returns>An enumerable of all tasks queued to this scheduler.</returns>
560 protected override IEnumerable
<Task
> GetScheduledTasks() { return _workItems.ToList(); }
562 /// <summary>Queues a task to the scheduler.</summary>
563 /// <param name="task">The task to be queued.</param>
564 protected override void QueueTask(Task task
)
566 if (_disposed
) throw new ObjectDisposedException(GetType().Name
);
568 // Queue up the task locally to this queue, and then notify
569 // the parent scheduler that there's work available
570 lock (_pool
._queueGroups
) _workItems
.Enqueue(task
);
571 _pool
.NotifyNewWorkItem();
574 /// <summary>Tries to execute a task synchronously on the current thread.</summary>
575 /// <param name="task">The task to execute.</param>
576 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
577 /// <returns>true if the task was executed; otherwise, false.</returns>
578 protected override bool TryExecuteTaskInline(Task task
, bool taskWasPreviouslyQueued
)
580 // If we're using our own threads and if this is being called from one of them,
581 // or if we're currently processing another task on this thread, try running it inline.
582 return _taskProcessingThread
.Value
&& TryExecuteTask(task
);
585 /// <summary>Runs the specified ask.</summary>
586 /// <param name="task">The task to execute.</param>
587 internal void ExecuteTask(Task task
) { TryExecuteTask(task); }
589 /// <summary>Gets the maximum concurrency level to use when processing tasks.</summary>
590 public override int MaximumConcurrencyLevel { get { return _pool.MaximumConcurrencyLevel; }
}
592 /// <summary>Signals that the queue should be removed from the scheduler as soon as the queue is empty.</summary>
593 public void Dispose()
597 lock (_pool
._queueGroups
)
599 // We only remove the queue if it's empty. If it's not empty,
600 // we still mark it as disposed, and the associated QueuedTaskScheduler
601 // will remove the queue when its count hits 0 and its _disposed is true.
602 if (_workItems
.Count
== 0)
604 _pool
.RemoveQueue_NeedsLock(this);